package mr

import (
	"crypto/rand"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"sort"
	"strconv"
	"time"
)
import "log"
import "net/rpc"
import "hash/fnv"

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

// for sorting by key.
type ByKey []KeyValue

func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

// 生成一个workerId
func genWorkerId() (uuid string) {
	unix32bits := uint32(time.Now().UTC().Unix())
	buff := make([]byte, 12)
	n, err := rand.Read(buff)
	if n != len(buff) || err != nil {
		panic(err)
	}
	return fmt.Sprintf("%x-%x-%x-%x-%x-%x\n", unix32bits, buff[0:2], buff[2:4], buff[4:6], buff[6:8], buff[8:])
}

//
// main/mrworker.go calls this function.
// 先发一个rpc过去给master，master给worker分配工作
// worker从taskInfo中获取到master回复的信息
// 然后决定接下来要做啥工作/或者不做工作
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.

	// uncomment to send the Example RPC to the master.
	// CallExample()
	// 随机生成一个工作者ID
	workerId := genWorkerId()
	// 重试次数
	// worker没有任务时会重复询问master三次，想知道是否还有工作要做
	retry := 3

	// 启动无限循环
	for {
		args := WorkArgs{WorkerId: workerId}
		taskInfo := TaskInfo{}
		// call成功会返回一个true，说明这个任务已经在工作中了
		working := call("Master.Work", &args, &taskInfo)

		// 如果任务已经完成，或者call没有成功，那么直接退出该函数
		if taskInfo.IsFinished || !working {
			return
		}

		// 根据master分配的任务来决定调用哪个函数
		switch taskInfo.MapReduce {
		case "map":
			// 这个调用是阻塞调用
			MapWork(taskInfo, mapf)
			// worker完成了一个任务之后，恢复到没有任务的状态，因此把retry重新初始化为3
			retry = 3
		case "reduce":
			ReduceWork(taskInfo, reducef)
			retry = 3
		default:
			if retry < 0 {
				return
			}
			retry--
		}

		// worker完成了以上的任务了，通过rpc通知master
		commitInfo := CommitInfo{
			WorkerId:  workerId,
			TaskId:    taskInfo.TaskId,
			MapReduce: taskInfo.MapReduce,
		}

		// master通过该参数通知worker，提交成功
		commitReply := CommitReply{}
		call("Master.Commit", &commitInfo, &commitReply)
		// 设置一个轮询间隔时间，降低cpu负载
		time.Sleep(500 * time.Millisecond)
	}

}

func MapWork(task TaskInfo, mapf func(string, string) []KeyValue) {
	// 获取文件句柄
	f, err := os.Open(task.FileName)
	if err != nil {
		log.Fatalf("cannot open %v", task.FileName)
	}
	defer f.Close()

	content, err := ioutil.ReadAll(f)
	if err != nil {
		log.Fatalf("cannot read %v", task.FileName)
	}

	// 从content中读取到一个kv切片
	kvArr := mapf(task.FileName, string(content))
	// 先把kvArr强转成ByKey类型，然后排序
	sort.Sort(ByKey(kvArr))

	// 生成map输出的文件名
	tmpName := "mr-tmp-" + strconv.Itoa(task.TaskId)

	// json.Encoder: An Encoder writes JSON values to an output stream.
	var fileWriters = make([]*json.Encoder, task.FileNumber)
	for i := 0; i < task.FileNumber; i++ {
		// os.Create()函数保证每次打开的文件都是空白的，并且获取指向文件头部的句柄
		ofile, _ := os.Create(tmpName + "-" + strconv.Itoa(i))
		defer ofile.Close()
		// 装饰器模式，把文件句柄包装一下，让其拥有json的写入方法
		fileWriters[i] = json.NewEncoder(ofile)
	}

	for _, kv := range kvArr {
		key := kv.Key
		// 计算出该key对应输出到第几个文件
		i := ihash(key) % task.FileNumber
		// 把对应的kv写入第i个文件中
		err := fileWriters[i].Encode(&kv)
		if err != nil {
			log.Fatalf("Unable to write to the file: %v", tmpName+"-"+strconv.Itoa(i))
		}
	}
}

func ReduceWork(task TaskInfo, reducef func(string, []string) string) {
	intermediate := []KeyValue{}

	// 读取该reduce任务所对应的所有tmp文件
	for i := 0; i < task.FileNumber; i++ {
		// 文件名的格式: mr-tmp-mapN-redN
		// 此处的taskId已经是reduce任务的id了
		filename := "mr-tmp-" + strconv.Itoa(i) + "-" + strconv.Itoa(task.TaskId)
		f, err := os.Open(filename)
		if err != nil {
			log.Fatal("Unable to read from:", filename)
		}
		defer f.Close()

		decoder := json.NewDecoder(f)

		// 把kv键值对从文件中读出，放到intermediate中
		var kv KeyValue
		for decoder.More() {
			err := decoder.Decode(&kv)
			if err != nil {
				log.Fatal("Json decode failed,", err)
			}
			intermediate = append(intermediate, kv)
		}
	}

	// 排序
	sort.Sort(ByKey(intermediate))
	// 创建最终输出文件
	ofile, err := os.Create("mr-out-" + strconv.Itoa(task.TaskId+1))
	if err != nil {
		log.Fatal("Unable to create file:", ofile)
	}
	defer ofile.Close()

	// 统计每个key出现的次数
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		// 存储相同key对应的值的数组
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		// 这是一个string类型的数字
		output := reducef(intermediate[i].Key, values)
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}
}

//
// example function to show how to make an RPC call to the master.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	call("Master.Example", &args, &reply)

	// reply.Y should be 100.
	fmt.Printf("reply.Y %v\n", reply.Y)
}

//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}
